package org.zjvis.datascience.common.etl;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.statement.create.view.CreateView;
import net.sf.jsqlparser.statement.select.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zjvis.datascience.common.constant.DataJsonConstant;
import org.zjvis.datascience.common.dto.TaskDTO;
import org.zjvis.datascience.common.dto.TaskInstanceDTO;
import org.zjvis.datascience.common.util.JsonParseUtil;
import org.zjvis.datascience.common.util.MapUtil;
import org.zjvis.datascience.common.util.task.TaskInstanceDTOUtil;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * @description ETL-Filter筛选器 工具类
 * @date 2021-12-27
 */
public class FilterHelper {

    private static final String COLS_FILTER = "columnsFilter";
    private static final String COLS_EXISTED = "tableCols";
    private static final String OUTPUT_IDX = "output";
    private static final String INPUT_IDX = "input";

    private final static Logger logger = LoggerFactory.getLogger("FilterHelper");

    public static void updateColumnsFilter(TaskDTO task) {
        JSONObject jsonObject = JSONObject.parseObject(task.getDataJson());
        JSONArray selectedCols = jsonObject.getJSONArray(COLS_FILTER);
        JSONObject output = jsonObject.getJSONArray(OUTPUT_IDX).getJSONObject(0);
        JSONArray outputCols = output.getJSONArray(COLS_EXISTED);
        JSONArray intersectionSet = getIntersectionSet(selectedCols, outputCols);
        jsonObject.put(COLS_FILTER, intersectionSet);
        task.setDataJson(jsonObject.toJSONString());
    }

    private static JSONArray getIntersectionSet(JSONArray target, JSONArray reference) {
        Iterator<Object> iterator = target.stream().iterator();
        List<String> referenceList = reference.toJavaList(String.class);
        JSONArray result = new JSONArray();
        while (iterator.hasNext()) {
            String col = (String) iterator.next();
            if (referenceList.contains(col)) {
                result.add(col);
            }
        }
        return result;
    }


    public static String rebuildSql(String sql, TaskInstanceDTO instance) {
        String FormattedSql = sql.replaceAll("\uFEFF", "").replaceAll("、", "");
        List<SelectItem> newCols = new ArrayList<SelectItem>();
        CreateView statement = null;
        try {
            statement = (CreateView) CCJSqlParserUtil.parse(FormattedSql);
            Select select = (statement).getSelect();
            SelectBody selectBody = select.getSelectBody();

            JSONObject jsonObject = JSONObject.parseObject(instance.getDataJson()).getJSONObject(DataJsonConstant.INSTANCE_JSON_HEADER);
            JSONObject input = jsonObject.getJSONArray(INPUT_IDX).getJSONObject(0);
            List<String> existedCols = input.getJSONArray(COLS_EXISTED).toJavaList(String.class);
            jsonObject.getJSONArray(COLS_FILTER).toJavaList(String.class).stream().forEach(col -> {
                if (existedCols.contains(col)) {
                    newCols.add(new SelectExpressionItem(new Column(col)));
                }
            });

            PlainSelect plainSelect = (PlainSelect) selectBody;
            ((PlainSelect) selectBody).setSelectItems(newCols);
            select.setSelectBody(plainSelect);
            statement.setSelect(select);

            //还要修改datajson里的output数据
            List<String> colNames = newCols.stream().map(SelectItem::toString).collect(Collectors.toList());
            syncColsInfo(colNames, jsonObject);
            TaskInstanceDTOUtil.updateInstanceDataJson(instance, jsonObject);
            //还要更新
        } catch (JSQLParserException e) {
            logger.warn("error happened when parsing sql, since {}", e.getLocalizedMessage());
        }
        return statement.toString();
    }


    /**
     * 更新json中输出结构的结果
     * @param colNames
     * @param dataJson
     */
    private static void syncColsInfo(List<String> colNames, JSONObject dataJson) {
        JsonParseUtil.setValueByKey(colNames, "output[0].tableCols", dataJson);
        List<String> inputCols = JsonParseUtil.getValueByKey("input[0].tableCols", dataJson, JSONArray.class).toJavaList(String.class);
        List<String> inputTypes = JsonParseUtil.getValueByKey("input[0].columnTypes", dataJson, JSONArray.class).toJavaList(String.class);
        Map<String, String> colTypeMap = MapUtil.genMapFromList(inputCols, inputTypes);
        JsonParseUtil.setValueByKey(inputCols, "columnsFilter", dataJson);
        List<String> colTypes = MapUtil.extractValues(colTypeMap, colNames);
        JsonParseUtil.setValueByKey(colTypes, "output[0].columnTypes", dataJson);
    }

    public static String rebuildSqlTest(String sql, String dataJson) {
        sql = sql.replaceAll("\uFEFF", "");
        List<SelectItem> newCols = new ArrayList<SelectItem>();
        CreateView statement = null;
        try {
            statement = (CreateView) CCJSqlParserUtil.parse(sql);
            Select select = (statement).getSelect();
            SelectBody selectBody = select.getSelectBody();

            JSONObject jsonObject = JSONObject.parseObject(dataJson).getJSONObject(DataJsonConstant.INSTANCE_JSON_HEADER);
            JSONObject input = jsonObject.getJSONArray(INPUT_IDX).getJSONObject(0);
            input.getJSONArray(COLS_EXISTED).toJavaList(String.class).stream().forEach(col -> {
                newCols.add(new SelectExpressionItem(new Column(col)));
            });

            PlainSelect plainSelect = (PlainSelect) selectBody;
            ((PlainSelect) selectBody).setSelectItems(newCols);
            select.setSelectBody(plainSelect);
            statement.setSelect(select);

            //还要修改datajson里的output数据
            List<String> colNames = newCols.stream().map(SelectItem::toString).collect(Collectors.toList());
            syncColsInfo(colNames, jsonObject);
            System.out.println("datajson2 -> "+ jsonObject.toJSONString());
        } catch (JSQLParserException e) {
            logger.warn("error happened when parsing sql, since {}", e.getLocalizedMessage());
        }

        System.out.println("dataJson -> "+ dataJson);
        return statement.toString();
    }


    public static void main(String[] args) {
        String json = "{\"inputInfo\":{\"isSample\":\"SUCCESS\",\"subTypeName\":\"ETL/清洗\",\"modelId\":0,\"maxParentNumber\":1,\"forbidden\":false,\"columnsFilter\":[\"\uFEFFname\",\"url\",\"pub_info\",\"评分\",\"评分人数\",\"bv\",\"_record_id_\"],\"algType\":\"1\",\"parentType\":[2],\"output\":[{\"tableCols\":[\"\uFEFFname\",\"url\",\"pub_info\",\"评分\",\"评分人数\",\"bv\",\"_record_id_\"],\"nodeName\":\"FILTER(1)\",\"totalRow\":3061,\"semantic\":{\"评分\":\"null\",\"BV\":\"null\",\"_record_id_\":\"null\",\"播放、弹幕、追番\":\"null\",\"\uFEFFName\":\"null\",\"pub-info\":\"null\",\"投币量\":\"null\",\"URL\":\"null\",\"评分人数\":\"null\"},\"columnTypes\":[\"CHARACTER VARYING\",\"CHARACTER VARYING\",\"CHARACTER VARYING\",\"VARCHAR\",\"VARCHAR\",\"CHARACTER VARYING\",\"INTEGER\"],\"subType\":0,\"tableName\":\"pipeline.view_4245_38290_\"}],\"input\":[{\"tableCols\":[\"\uFEFFname\",\"url\",\"pub_info\",\"bv\",\"播放、弹幕、追番\",\"_record_id_\"],\"nodeName\":\"FILTER\",\"totalRow\":3061,\"semantic\":{\"评分\":\"null\",\"BV\":\"null\",\"_record_id_\":\"null\",\"播放、弹幕、追番\":\"null\",\"\uFEFFName\":\"null\",\"pub-info\":\"null\",\"投币量\":\"null\",\"URL\":\"null\",\"评分人数\":\"null\"},\"columnTypes\":[\"CHARACTER VARYING\",\"CHARACTER VARYING\",\"CHARACTER VARYING\",\"CHARACTER VARYING\",\"CHARACTER VARYING\",\"INTEGER\"],\"subType\":0,\"tableName\":\"pipeline.view_4245_38288_\"}],\"lastTimeStamp\":1637047319307,\"parentTimeStamps\":[1637047311531],\"rowsFilter\":[],\"isSimple\":false,\"subType\":0,\"position\":{\"col\":3,\"row\":2},\"lastStatus\":\"SUCCESS\",\"algName\":\"FILTER\"}}";
        String sql = "CREATE VIEW pipeline.view_4245_38290_1637047787528 AS SELECT \uFEFFname, url, pub_info, 评分, 评分人数, bv, _record_id_ FROM pipeline.view_4245_38288_1637047787528";

        String result = rebuildSqlTest(sql, json);
        System.out.println(result);
    }

}
